from __future__ import annotations

import abc
import asyncio
import datetime
import threading
import uuid
import warnings
from contextlib import AsyncExitStack
from functools import partial
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Generic,
    Optional,
    Type,
)
from uuid import UUID, uuid4
from zoneinfo import ZoneInfo

import anyio
import anyio.abc
import httpx
from exceptiongroup import BaseExceptionGroup, ExceptionGroup
from importlib_metadata import (
    distributions,  # type: ignore[reportUnknownVariableType] incomplete typing
)
from pydantic import BaseModel, Field, PrivateAttr, field_validator
from pydantic.json_schema import GenerateJsonSchema
from typing_extensions import Literal, Self, TypeVar

import prefect
import prefect.types._datetime
from prefect._internal.compatibility.deprecated import PrefectDeprecationWarning
from prefect._internal.schemas.validators import return_v_or_none
from prefect.client.base import ServerType
from prefect.client.orchestration import PrefectClient, get_client
from prefect.client.schemas.actions import WorkPoolCreate, WorkPoolUpdate
from prefect.client.schemas.objects import Flow as APIFlow
from prefect.client.schemas.objects import (
    Integration,
    StateType,
    WorkerMetadata,
    WorkPool,
)
from prefect.client.utilities import inject_client
from prefect.context import FlowRunContext, TagsContext
from prefect.events import Event, RelatedResource, emit_event
from prefect.events.related import object_as_related_resource, tags_as_related_resources
from prefect.exceptions import (
    Abort,
    ObjectNotFound,
)
from prefect.filesystems import LocalFileSystem
from prefect.futures import PrefectFlowRunFuture
from prefect.logging.loggers import (
    PrefectLogAdapter,
    flow_run_logger,
    get_worker_logger,
)
from prefect.plugins import load_prefect_collections
from prefect.settings import (
    PREFECT_API_URL,
    PREFECT_TEST_MODE,
    PREFECT_WORKER_HEARTBEAT_SECONDS,
    PREFECT_WORKER_PREFETCH_SECONDS,
    PREFECT_WORKER_QUERY_SECONDS,
    get_current_settings,
)
from prefect.states import (
    Cancelled,
    Crashed,
    Pending,
    exception_to_failed_state,
)
from prefect.tasks import Task
from prefect.types import KeyValueLabels
from prefect.utilities.dispatch import get_registry_for_type, register_base_type
from prefect.utilities.engine import propose_state
from prefect.utilities.services import (
    critical_service_loop,
    start_client_metrics_server,
    stop_client_metrics_server,
)
from prefect.utilities.slugify import slugify
from prefect.utilities.templating import (
    apply_values,
    resolve_block_document_references,
    resolve_variables,
)
from prefect.utilities.urls import url_for

if TYPE_CHECKING:
    from prefect.client.schemas.objects import FlowRun
    from prefect.client.schemas.responses import (
        DeploymentResponse,
        WorkerFlowRunResponse,
    )
    from prefect.flows import Flow


class BaseJobConfiguration(BaseModel):
    command: Optional[str] = Field(
        default=None,
        description=(
            "The command to use when starting a flow run. "
            "In most cases, this should be left blank and the command "
            "will be automatically generated by the worker."
        ),
    )
    env: dict[str, Optional[str]] = Field(
        default_factory=dict,
        title="Environment Variables",
        description="Environment variables to set when starting a flow run.",
    )
    labels: dict[str, str] = Field(
        default_factory=dict,
        description=(
            "Labels applied to infrastructure created by the worker using "
            "this job configuration."
        ),
    )
    name: Optional[str] = Field(
        default=None,
        description=(
            "Name given to infrastructure created by the worker using this "
            "job configuration."
        ),
    )

    _related_objects: dict[str, Any] = PrivateAttr(default_factory=dict)

    @property
    def is_using_a_runner(self) -> bool:
        return self.command is not None and "prefect flow-run execute" in self.command

    @field_validator("command")
    @classmethod
    def _coerce_command(cls, v: str | None) -> str | None:
        return return_v_or_none(v)

    @field_validator("env", mode="before")
    @classmethod
    def _coerce_env(cls, v: dict[str, Any]) -> dict[str, str | None]:
        return {k: str(v) if v is not None else None for k, v in v.items()}

    @staticmethod
    def _get_base_config_defaults(variables: dict[str, Any]) -> dict[str, Any]:
        """Get default values from base config for all variables that have them."""
        defaults: dict[str, Any] = {}
        for variable_name, attrs in variables.items():
            # We remote `None` values because we don't want to use them in templating.
            # The currently logic depends on keys not existing to populate the correct value
            # in some cases.
            # Pydantic will provide default values if the keys are missing when creating
            # a configuration class.
            if "default" in attrs and attrs.get("default") is not None:
                defaults[variable_name] = attrs["default"]

        return defaults

    @classmethod
    @inject_client
    async def from_template_and_values(
        cls,
        base_job_template: dict[str, Any],
        values: dict[str, Any],
        client: "PrefectClient | None" = None,
    ):
        """Creates a valid worker configuration object from the provided base
        configuration and overrides.

        Important: this method expects that the base_job_template was already
        validated server-side.
        """
        base_config: dict[str, Any] = base_job_template["job_configuration"]
        variables_schema = base_job_template["variables"]
        variables = cls._get_base_config_defaults(
            variables_schema.get("properties", {})
        )

        # merge variable defaults for `env` into base config before they're replaced by
        # deployment overrides
        if variables.get("env"):
            if isinstance(base_config.get("env"), dict):
                # Merge: preserve env vars from job_configuration
                base_config["env"] = base_config["env"] | variables.get("env")
            else:
                # Replace template with defaults
                base_config["env"] = variables.get("env")

        variables.update(values)

        # deep merge `env`
        if isinstance(base_config.get("env"), dict) and (
            deployment_env := variables.get("env")
        ):
            base_config["env"] = base_config.get("env") | deployment_env

        populated_configuration = apply_values(template=base_config, values=variables)
        populated_configuration = await resolve_block_document_references(
            template=populated_configuration, client=client
        )
        populated_configuration = await resolve_variables(
            template=populated_configuration, client=client
        )
        return cls(**populated_configuration)

    @classmethod
    def json_template(cls) -> dict[str, Any]:
        """Returns a dict with job configuration as keys and the corresponding templates as values

        Defaults to using the job configuration parameter name as the template variable name.

        e.g.
        ```python
        {
            key1: '{{ key1 }}',     # default variable template
            key2: '{{ template2 }}', # `template2` specifically provide as template
        }
        ```
        """
        configuration: dict[str, Any] = {}
        properties = cls.model_json_schema()["properties"]
        for k, v in properties.items():
            if v.get("template"):
                template = v["template"]
            else:
                template = "{{ " + k + " }}"
            configuration[k] = template

        return configuration

    def prepare_for_flow_run(
        self,
        flow_run: "FlowRun",
        deployment: "DeploymentResponse | None" = None,
        flow: "APIFlow | None" = None,
        work_pool: "WorkPool | None" = None,
        worker_name: str | None = None,
    ) -> None:
        """
        Prepare the job configuration for a flow run.

        This method is called by the worker before starting a flow run. It
        should be used to set any configuration values that are dependent on
        the flow run.

        Args:
            flow_run: The flow run to be executed.
            deployment: The deployment that the flow run is associated with.
            flow: The flow that the flow run is associated with.
            work_pool: The work pool that the flow run is running in.
            worker_name: The name of the worker that is submitting the flow run.
        """

        self._related_objects = {
            "deployment": deployment,
            "flow": flow,
            "flow-run": flow_run,
        }

        env = {
            **self._base_environment(),
            **self._base_flow_run_environment(flow_run),
            **(self.env if isinstance(self.env, dict) else {}),  # pyright: ignore[reportUnnecessaryIsInstance]
        }
        self.env = {key: value for key, value in env.items() if value is not None}
        self.labels = {
            **self._base_flow_run_labels(flow_run),
            **self._base_work_pool_labels(work_pool),
            **self._base_worker_name_label(worker_name),
            **self._base_flow_labels(flow),
            **self._base_deployment_labels(deployment),
            **self.labels,
        }
        self.name = self.name or flow_run.name
        self.command = self.command or self._base_flow_run_command()

    @staticmethod
    def _base_flow_run_command() -> str:
        """
        Generate a command for a flow run job.
        """
        return "prefect flow-run execute"

    @staticmethod
    def _base_flow_run_labels(flow_run: "FlowRun") -> dict[str, str]:
        """
        Generate a dictionary of labels for a flow run job.
        """
        return {
            "prefect.io/flow-run-id": str(flow_run.id),
            "prefect.io/flow-run-name": flow_run.name,
            "prefect.io/version": prefect.__version__,
        }

    @classmethod
    def _base_environment(cls) -> dict[str, str]:
        """
        Environment variables that should be passed to all created infrastructure.

        These values should be overridable with the `env` field.
        """
        return get_current_settings().to_environment_variables(exclude_unset=True)

    @staticmethod
    def _base_flow_run_environment(flow_run: "FlowRun | None") -> dict[str, str]:
        """
        Generate a dictionary of environment variables for a flow run job.
        """
        if flow_run is None:
            return {}

        return {"PREFECT__FLOW_RUN_ID": str(flow_run.id)}

    @staticmethod
    def _base_deployment_labels(
        deployment: "DeploymentResponse | None",
    ) -> dict[str, str]:
        if deployment is None:
            return {}

        labels = {
            "prefect.io/deployment-id": str(deployment.id),
            "prefect.io/deployment-name": deployment.name,
        }
        if deployment.updated is not None:
            labels["prefect.io/deployment-updated"] = deployment.updated.astimezone(
                ZoneInfo("UTC")
            ).isoformat()
        return labels

    @staticmethod
    def _base_flow_labels(flow: "APIFlow | None") -> dict[str, str]:
        if flow is None:
            return {}

        return {
            "prefect.io/flow-id": str(flow.id),
            "prefect.io/flow-name": flow.name,
        }

    @staticmethod
    def _base_work_pool_labels(work_pool: "WorkPool | None") -> dict[str, str]:
        """Adds the work pool labels to the job manifest."""
        if work_pool is None:
            return {}

        return {
            "prefect.io/work-pool-name": work_pool.name,
            "prefect.io/work-pool-id": str(work_pool.id),
        }

    @staticmethod
    def _base_worker_name_label(worker_name: str | None) -> dict[str, str]:
        """Adds the worker name label to the job manifest."""
        if worker_name is None:
            return {}

        return {"prefect.io/worker-name": worker_name}

    def _related_resources(self) -> list[RelatedResource]:
        tags: set[str] = set()
        related: list[RelatedResource] = []

        for kind, obj in self._related_objects.items():
            if obj is None:
                continue
            if hasattr(obj, "tags"):
                tags.update(obj.tags)
            related.append(object_as_related_resource(kind=kind, role=kind, object=obj))

        return related + tags_as_related_resources(tags)


class BaseVariables(BaseModel):
    name: Optional[str] = Field(
        default=None,
        description="Name given to infrastructure created by a worker.",
    )
    env: dict[str, Optional[str]] = Field(
        default_factory=dict,
        title="Environment Variables",
        description="Environment variables to set when starting a flow run.",
    )
    labels: dict[str, str] = Field(
        default_factory=dict,
        description="Labels applied to infrastructure created by a worker.",
    )
    command: Optional[str] = Field(
        default=None,
        description=(
            "The command to use when starting a flow run. "
            "In most cases, this should be left blank and the command "
            "will be automatically generated by the worker."
        ),
    )

    @classmethod
    def model_json_schema(
        cls,
        by_alias: bool = True,
        ref_template: str = "#/definitions/{model}",
        schema_generator: Type[GenerateJsonSchema] = GenerateJsonSchema,
        mode: Literal["validation", "serialization"] = "validation",
        *,
        union_format: Literal["any_of", "primitive_type_array"] = "any_of",
    ) -> dict[str, Any]:
        """TODO: stop overriding this method - use GenerateSchema in ConfigDict instead?"""
        schema = super().model_json_schema(
            by_alias, ref_template, schema_generator, mode
        )

        # ensure backwards compatibility by copying $defs into definitions
        if "$defs" in schema:
            schema["definitions"] = schema.pop("$defs")

        # we aren't expecting these additional fields in the schema
        if "additionalProperties" in schema:
            schema.pop("additionalProperties")

        for _, definition in schema.get("definitions", {}).items():
            if "additionalProperties" in definition:
                definition.pop("additionalProperties")

        return schema


class BaseWorkerResult(BaseModel, abc.ABC):
    identifier: str
    status_code: int

    def __bool__(self) -> bool:
        return self.status_code == 0


C = TypeVar("C", bound=BaseJobConfiguration)
V = TypeVar("V", bound=BaseVariables)
R = TypeVar("R", bound=BaseWorkerResult)
FR = TypeVar("FR")  # used to capture the return type of a flow


@register_base_type
class BaseWorker(abc.ABC, Generic[C, V, R]):
    type: str
    job_configuration: Type[C] = BaseJobConfiguration  # type: ignore
    job_configuration_variables: Optional[Type[V]] = None

    _documentation_url = ""
    _logo_url = ""
    _description = ""

    def __init__(
        self,
        work_pool_name: str,
        work_queues: list[str] | None = None,
        name: str | None = None,
        prefetch_seconds: float | None = None,
        create_pool_if_not_found: bool = True,
        limit: int | None = None,
        heartbeat_interval_seconds: int | None = None,
        *,
        base_job_template: dict[str, Any] | None = None,
    ):
        """
        Base class for all Prefect workers.

        Args:
            name: The name of the worker. If not provided, a random one
                will be generated. If provided, it cannot contain '/' or '%'.
                The name is used to identify the worker in the UI; if two
                processes have the same name, they will be treated as the same
                worker.
            work_pool_name: The name of the work pool to poll.
            work_queues: A list of work queues to poll. If not provided, all
                work queue in the work pool will be polled.
            prefetch_seconds: The number of seconds to prefetch flow runs for.
            create_pool_if_not_found: Whether to create the work pool
                if it is not found. Defaults to `True`, but can be set to `False` to
                ensure that work pools are not created accidentally.
            limit: The maximum number of flow runs this worker should be running at
                a given time.
            heartbeat_interval_seconds: The number of seconds between worker heartbeats.
            base_job_template: If creating the work pool, provide the base job
                template to use. Logs a warning if the pool already exists.
        """
        if name and ("/" in name or "%" in name):
            raise ValueError("Worker name cannot contain '/' or '%'")
        self.name: str = name or f"{self.__class__.__name__} {uuid4()}"
        self._started_event: Optional[Event] = None
        self.backend_id: Optional[UUID] = None
        self._logger = get_worker_logger(self)

        self.is_setup = False
        self._create_pool_if_not_found = create_pool_if_not_found
        self._base_job_template = base_job_template
        self._work_pool_name = work_pool_name
        self._work_queues: set[str] = set(work_queues) if work_queues else set()

        self._prefetch_seconds: float = (
            prefetch_seconds or PREFECT_WORKER_PREFETCH_SECONDS.value()
        )
        self.heartbeat_interval_seconds: int = (
            heartbeat_interval_seconds or PREFECT_WORKER_HEARTBEAT_SECONDS.value()
        )

        self._work_pool: Optional[WorkPool] = None
        self._exit_stack: AsyncExitStack = AsyncExitStack()
        self._runs_task_group: Optional[anyio.abc.TaskGroup] = None
        self._client: Optional[PrefectClient] = None
        self._last_polled_time: datetime.datetime = prefect.types._datetime.now("UTC")
        self._limit = limit
        self._limiter: Optional[anyio.CapacityLimiter] = None
        self._submitting_flow_run_ids: set[UUID] = set()
        self._cancelling_flow_run_ids: set[UUID] = set()
        self._scheduled_task_scopes: set[anyio.CancelScope] = set()
        self._worker_metadata_sent = False

    @property
    def client(self) -> PrefectClient:
        if self._client is None:
            raise RuntimeError(
                "Worker has not been correctly initialized. Please use the worker class as an async context manager."
            )
        return self._client

    @property
    def work_pool(self) -> WorkPool:
        if self._work_pool is None:
            raise RuntimeError(
                "Worker has not been correctly initialized. Please use the worker class as an async context manager."
            )
        return self._work_pool

    @property
    def limiter(self) -> anyio.CapacityLimiter:
        if self._limiter is None:
            raise RuntimeError(
                "Worker has not been correctly initialized. Please use the worker class as an async context manager."
            )
        return self._limiter

    @classmethod
    def get_documentation_url(cls) -> str:
        return cls._documentation_url

    @classmethod
    def get_logo_url(cls) -> str:
        return cls._logo_url

    @classmethod
    def get_description(cls) -> str:
        return cls._description

    @classmethod
    def get_default_base_job_template(cls) -> dict[str, Any]:
        if cls.job_configuration_variables is None:
            schema = cls.job_configuration.model_json_schema()
            # remove "template" key from all dicts in schema['properties'] because it is not a
            # relevant field
            for key, value in schema["properties"].items():
                if isinstance(value, dict):
                    schema["properties"][key].pop("template", None)
            variables_schema = schema
        else:
            variables_schema = cls.job_configuration_variables.model_json_schema()
        variables_schema.pop("title", None)
        return {
            "job_configuration": cls.job_configuration.json_template(),
            "variables": variables_schema,
        }

    @staticmethod
    def get_worker_class_from_type(
        type: str,
    ) -> Optional[Type["BaseWorker[Any, Any, Any]"]]:
        """
        Returns the worker class for a given worker type. If the worker type
        is not recognized, returns None.
        """
        load_prefect_collections()
        worker_registry = get_registry_for_type(BaseWorker)
        if worker_registry is not None:
            return worker_registry.get(type)

    @staticmethod
    def get_all_available_worker_types() -> list[str]:
        """
        Returns all worker types available in the local registry.
        """
        load_prefect_collections()
        worker_registry = get_registry_for_type(BaseWorker)
        if worker_registry is not None:
            return list(worker_registry.keys())
        return []

    def get_name_slug(self) -> str:
        return slugify(self.name)

    def get_flow_run_logger(self, flow_run: "FlowRun") -> PrefectLogAdapter:
        extra = {
            "worker_name": self.name,
            "work_pool_name": (
                self._work_pool_name if self._work_pool else "<unknown>"
            ),
            "work_pool_id": str(getattr(self._work_pool, "id", "unknown")),
        }
        if self.backend_id:
            extra["worker_id"] = str(self.backend_id)

        return flow_run_logger(flow_run=flow_run).getChild(
            "worker",
            extra=extra,
        )

    async def start(
        self,
        run_once: bool = False,
        with_healthcheck: bool = False,
        printer: Callable[..., None] = print,
    ) -> None:
        """
        Starts the worker and runs the main worker loops.

        By default, the worker will run loops to poll for scheduled/cancelled flow
        runs and sync with the Prefect API server.

        If `run_once` is set, the worker will only run each loop once and then return.

        If `with_healthcheck` is set, the worker will start a healthcheck server which
        can be used to determine if the worker is still polling for flow runs and restart
        the worker if necessary.

        Args:
            run_once: If set, the worker will only run each loop once then return.
            with_healthcheck: If set, the worker will start a healthcheck server.
            printer: A `print`-like function where logs will be reported.
        """
        healthcheck_server = None
        healthcheck_thread = None
        try:
            async with self as worker:
                # schedule the scheduled flow run polling loop
                async with anyio.create_task_group() as loops_task_group:
                    loops_task_group.start_soon(
                        partial(
                            critical_service_loop,
                            workload=self.get_and_submit_flow_runs,
                            interval=PREFECT_WORKER_QUERY_SECONDS.value(),
                            run_once=run_once,
                            jitter_range=0.3,
                            backoff=4,  # Up to ~1 minute interval during backoff
                        )
                    )
                    # schedule the sync loop
                    loops_task_group.start_soon(
                        partial(
                            critical_service_loop,
                            workload=self.sync_with_backend,
                            interval=self.heartbeat_interval_seconds,
                            run_once=run_once,
                            jitter_range=0.3,
                            backoff=4,
                        )
                    )

                    self._started_event = await self._emit_worker_started_event()

                    start_client_metrics_server()

                    if with_healthcheck:
                        from prefect.workers.server import build_healthcheck_server

                        # we'll start the ASGI server in a separate thread so that
                        # uvicorn does not block the main thread
                        healthcheck_server = build_healthcheck_server(
                            worker=worker,
                            query_interval_seconds=PREFECT_WORKER_QUERY_SECONDS.value(),
                        )
                        healthcheck_thread = threading.Thread(
                            name="healthcheck-server-thread",
                            target=healthcheck_server.run,
                            daemon=True,
                        )
                        healthcheck_thread.start()
                    printer(f"Worker {worker.name!r} started!")

                # If running once, wait for active runs to finish before teardown
                if run_once and self._limiter:
                    # Use the limiter's borrowed token count as the source of truth
                    while self.limiter.borrowed_tokens > 0:
                        self._logger.debug(
                            "Waiting for %s active run(s) to finish before shutdown...",
                            self.limiter.borrowed_tokens,
                        )
                        await anyio.sleep(0.1)
        finally:
            stop_client_metrics_server()

            if healthcheck_server and healthcheck_thread:
                self._logger.debug("Stopping healthcheck server...")
                healthcheck_server.should_exit = True
                healthcheck_thread.join()
                self._logger.debug("Healthcheck server stopped.")

        printer(f"Worker {worker.name!r} stopped!")

    @abc.abstractmethod
    async def run(
        self,
        flow_run: "FlowRun",
        configuration: C,
        task_status: Optional[anyio.abc.TaskStatus[int]] = None,
    ) -> R:
        """
        Runs a given flow run on the current worker.
        """
        raise NotImplementedError(
            "Workers must implement a method for running submitted flow runs"
        )

    async def _initiate_run(
        self,
        flow_run: "FlowRun",
        configuration: C,
    ) -> None:
        """
        This method is called by the worker to initiate a flow run and should return as
        soon as possible.

        This method is used in `.submit` to allow non-blocking submission of flows. For
        workers that wait for completion in their `run` method, this method should be
        implemented to return immediately.

        If this method is not implemented, `.submit` will fall back to the `.run` method.
        """
        raise NotImplementedError(
            "This worker has not implemented `_initiate_run`. Please use `run` instead."
        )

    async def submit(
        self,
        flow: "Flow[..., FR]",
        parameters: dict[str, Any] | None = None,
        job_variables: dict[str, Any] | None = None,
    ) -> "PrefectFlowRunFuture[FR]":
        """
        EXPERIMENTAL: The interface for this method is subject to change.

        Submits a flow to run via the worker.

        Args:
            flow: The flow to submit
            parameters: The parameters to pass to the flow

        Returns:
            A flow run object
        """
        warnings.warn(
            "Ad-hoc flow submission via workers is experimental. The interface "
            "and behavior of this feature are subject to change.",
            category=FutureWarning,
        )
        if self._runs_task_group is None:
            raise RuntimeError("Worker not properly initialized")

        flow_run = await self._runs_task_group.start(
            partial(
                self._submit_adhoc_run,
                flow=flow,
                parameters=parameters,
                job_variables=job_variables,
            ),
        )
        return PrefectFlowRunFuture(flow_run_id=flow_run.id)

    async def _submit_adhoc_run(
        self,
        flow: "Flow[..., FR]",
        parameters: dict[str, Any] | None = None,
        job_variables: dict[str, Any] | None = None,
        task_status: anyio.abc.TaskStatus["FlowRun"] | None = None,
    ):
        """
        Submits a flow for the worker to kick off execution for.
        """
        from prefect._experimental.bundles import (
            aupload_bundle_to_storage,
            convert_step_to_command,
            create_bundle_for_flow_run,
        )

        if (
            self.work_pool.storage_configuration.bundle_upload_step is None
            or self.work_pool.storage_configuration.bundle_execution_step is None
        ):
            raise RuntimeError(
                f"Storage is not configured for work pool {self.work_pool.name!r}. "
                "Please configure storage for the work pool by running `prefect "
                "work-pool storage configure`."
            )

        from prefect.results import aresolve_result_storage, get_result_store

        current_result_store = get_result_store()
        # Check result storage and use the work pool default if needed
        if (
            current_result_store.result_storage is None
            or isinstance(current_result_store.result_storage, LocalFileSystem)
            and flow.result_storage is None
        ):
            if (
                self.work_pool.storage_configuration.default_result_storage_block_id
                is None
            ):
                self._logger.warning(
                    f"Flow {flow.name!r} has no result storage configured. Please configure "
                    "result storage for the flow if you want to retrieve the result for the flow run."
                )
            else:
                # Use the work pool's default result storage block for the flow run to ensure the caller can retrieve the result
                flow = flow.with_options(
                    result_storage=await aresolve_result_storage(
                        self.work_pool.storage_configuration.default_result_storage_block_id
                    ),
                    persist_result=True,
                )

        bundle_key = str(uuid.uuid4())
        upload_command = convert_step_to_command(
            self.work_pool.storage_configuration.bundle_upload_step,
            bundle_key,
            quiet=True,
        )
        execute_command = convert_step_to_command(
            self.work_pool.storage_configuration.bundle_execution_step, bundle_key
        )

        job_variables = (job_variables or {}) | {"command": " ".join(execute_command)}
        parameters = parameters or {}

        # Create a parent task run if this is a child flow run to ensure it shows up as a child flow in the UI
        parent_task_run = None
        if flow_run_ctx := FlowRunContext.get():
            parent_task = Task[Any, Any](
                name=flow.name,
                fn=flow.fn,
                version=flow.version,
            )
            parent_task_run = await parent_task.create_run(
                flow_run_context=flow_run_ctx,
                parameters=parameters,
            )

        flow_run = await self.client.create_flow_run(
            flow,
            parameters=flow.serialize_parameters(parameters),
            state=Pending(),
            job_variables=job_variables,
            work_pool_name=self.work_pool.name,
            tags=TagsContext.get().current_tags,
            parent_task_run_id=getattr(parent_task_run, "id", None),
        )
        if task_status is not None:
            # Emit the flow run object to .submit to allow it to return a future as soon as possible
            task_status.started(flow_run)
        # Avoid an API call to get the flow
        api_flow = APIFlow(id=flow_run.flow_id, name=flow.name, labels={})
        logger = self.get_flow_run_logger(flow_run)

        configuration = await self.job_configuration.from_template_and_values(
            base_job_template=self.work_pool.base_job_template,
            values=job_variables,
            client=self._client,
        )
        configuration.prepare_for_flow_run(
            flow_run=flow_run,
            flow=api_flow,
            work_pool=self.work_pool,
            worker_name=self.name,
        )

        bundle = create_bundle_for_flow_run(flow=flow, flow_run=flow_run)
        await aupload_bundle_to_storage(bundle, bundle_key, upload_command)

        logger.debug("Successfully uploaded execution bundle")

        try:
            # Call the implementation-specific run method with the constructed configuration. This is where the
            # rubber meets the road.
            try:
                await self._initiate_run(flow_run, configuration)
            except NotImplementedError:
                result = await self.run(flow_run, configuration)

                if result.status_code != 0:
                    await self._propose_crashed_state(
                        flow_run,
                        (
                            "Flow run infrastructure exited with non-zero status code"
                            f" {result.status_code}."
                        ),
                    )
        except Exception as exc:
            # This flow run was being submitted and did not start successfully
            logger.exception(
                f"Failed to submit flow run '{flow_run.id}' to infrastructure."
            )
            message = f"Flow run could not be submitted to infrastructure:\n{exc!r}"
            await self._propose_crashed_state(flow_run, message, client=self.client)

    @classmethod
    def __dispatch_key__(cls) -> str | None:
        if cls.__name__ == "BaseWorker":
            return None  # The base class is abstract
        return cls.type

    async def setup(self) -> None:
        """Prepares the worker to run."""
        self._logger.debug("Setting up worker...")
        self._runs_task_group = anyio.create_task_group()
        self._limiter = (
            anyio.CapacityLimiter(self._limit) if self._limit is not None else None
        )

        if not PREFECT_TEST_MODE and not PREFECT_API_URL.value():
            raise ValueError("`PREFECT_API_URL` must be set to start a Worker.")

        self._client = get_client()

        await self._exit_stack.enter_async_context(self._client)
        await self._exit_stack.enter_async_context(self._runs_task_group)

        await self.sync_with_backend()

        self.is_setup = True

    async def teardown(self, *exc_info: Any) -> None:
        """Cleans up resources after the worker is stopped."""
        self._logger.debug("Tearing down worker...")
        self.is_setup: bool = False
        for scope in self._scheduled_task_scopes:
            scope.cancel()

        # Emit stopped event before closing client
        if self._started_event:
            try:
                await self._emit_worker_stopped_event(self._started_event)
            except Exception:
                self._logger.exception("Failed to emit worker stopped event")

        await self._exit_stack.__aexit__(*exc_info)
        self._runs_task_group = None
        self._client = None

    def is_worker_still_polling(self, query_interval_seconds: float) -> bool:
        """
        This method is invoked by a webserver healthcheck handler
        and returns a boolean indicating if the worker has recorded a
        scheduled flow run poll within a variable amount of time.

        The `query_interval_seconds` is the same value that is used by
        the loop services - we will evaluate if the _last_polled_time
        was within that interval x 30 (so 10s -> 5m)

        The instance property `self._last_polled_time`
        is currently set/updated in `get_and_submit_flow_runs()`
        """
        threshold_seconds = query_interval_seconds * 30

        seconds_since_last_poll = (
            prefect.types._datetime.now("UTC") - self._last_polled_time
        ).seconds

        is_still_polling = seconds_since_last_poll <= threshold_seconds

        if not is_still_polling:
            self._logger.error(
                f"Worker has not polled in the last {seconds_since_last_poll} seconds "
                "and should be restarted"
            )

        return is_still_polling

    async def get_and_submit_flow_runs(self) -> list["FlowRun"]:
        runs_response = await self._get_scheduled_flow_runs()

        self._last_polled_time = prefect.types._datetime.now("UTC")

        return await self._submit_scheduled_flow_runs(flow_run_response=runs_response)

    async def _update_local_work_pool_info(self) -> None:
        if TYPE_CHECKING:
            assert self._client is not None
        try:
            work_pool = await self._client.read_work_pool(
                work_pool_name=self._work_pool_name
            )

        except ObjectNotFound:
            if self._create_pool_if_not_found:
                wp = WorkPoolCreate(
                    name=self._work_pool_name,
                    type=self.type,
                )
                if self._base_job_template is not None:
                    wp.base_job_template = self._base_job_template

                work_pool = await self._client.create_work_pool(work_pool=wp)
                self._logger.info(f"Work pool {self._work_pool_name!r} created.")
            else:
                self._logger.warning(f"Work pool {self._work_pool_name!r} not found!")
                if self._base_job_template is not None:
                    self._logger.warning(
                        "Ignoring supplied base job template because the work pool"
                        " already exists"
                    )
                return

        # if the remote config type changes (or if it's being loaded for the
        # first time), check if it matches the local type and warn if not
        if getattr(self._work_pool, "type", 0) != work_pool.type:
            if work_pool.type != self.__class__.type:
                self._logger.warning(
                    "Worker type mismatch! This worker process expects type "
                    f"{self.type!r} but received {work_pool.type!r}"
                    " from the server. Unexpected behavior may occur."
                )

        # once the work pool is loaded, verify that it has a `base_job_template` and
        # set it if not
        if not work_pool.base_job_template:
            job_template = self.__class__.get_default_base_job_template()
            await self._set_work_pool_template(work_pool, job_template)
            work_pool.base_job_template = job_template

        self._work_pool = work_pool

    async def _worker_metadata(self) -> Optional[WorkerMetadata]:
        """
        Returns metadata about installed Prefect collections for the worker.
        """
        installed_integrations = load_prefect_collections().keys()

        integration_versions = [
            Integration(name=dist.metadata["Name"], version=dist.version)  # pyright: ignore[reportOptionalSubscript]
            for dist in distributions()
            # PyPI packages often use dashes, but Python package names use underscores
            # because they must be valid identifiers.
            if dist.metadata  # pyright: ignore[reportOptionalMemberAccess]
            and (name := dist.metadata.get("Name"))
            and (name.replace("-", "_") in installed_integrations)
        ]

        if integration_versions:
            return WorkerMetadata(integrations=integration_versions)
        return None

    async def _send_worker_heartbeat(self) -> Optional[UUID]:
        """
        Sends a heartbeat to the API.
        """
        if not self._client:
            self._logger.warning("Client has not been initialized; skipping heartbeat.")
            return None
        if not self._work_pool:
            self._logger.debug("Worker has no work pool; skipping heartbeat.")
            return None

        should_get_worker_id = self._should_get_worker_id()

        params: dict[str, Any] = {
            "work_pool_name": self._work_pool_name,
            "worker_name": self.name,
            "heartbeat_interval_seconds": self.heartbeat_interval_seconds,
            "get_worker_id": should_get_worker_id,
        }
        if (
            self._client.server_type == ServerType.CLOUD
            and not self._worker_metadata_sent
        ):
            worker_metadata = await self._worker_metadata()
            if worker_metadata:
                params["worker_metadata"] = worker_metadata
                self._worker_metadata_sent = True

        worker_id = None
        try:
            worker_id = await self._client.send_worker_heartbeat(**params)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 422 and should_get_worker_id:
                self._logger.warning(
                    "Failed to retrieve worker ID from the Prefect API server."
                )
                params["get_worker_id"] = False
                worker_id = await self._client.send_worker_heartbeat(**params)
            else:
                raise e

        if should_get_worker_id and worker_id is None:
            self._logger.warning(
                "Failed to retrieve worker ID from the Prefect API server."
            )

        return worker_id

    async def sync_with_backend(self) -> None:
        """
        Updates the worker's local information about it's current work pool and
        queues. Sends a worker heartbeat to the API.
        """
        await self._update_local_work_pool_info()

        remote_id = await self._send_worker_heartbeat()
        if remote_id:
            self.backend_id = remote_id
            self._logger = get_worker_logger(self)

        self._logger.debug(
            "Worker synchronized with the Prefect API server. "
            + (f"Remote ID: {self.backend_id}" if self.backend_id else "")
        )

    def _should_get_worker_id(self):
        """Determines if the worker should request an ID from the API server."""
        return (
            self._client
            and self._client.server_type == ServerType.CLOUD
            and self.backend_id is None
        )

    async def _get_scheduled_flow_runs(
        self,
    ) -> list["WorkerFlowRunResponse"]:
        """
        Retrieve scheduled flow runs from the work pool's queues.
        """
        scheduled_before = prefect.types._datetime.now("UTC") + datetime.timedelta(
            seconds=int(self._prefetch_seconds)
        )
        self._logger.debug(
            f"Querying for flow runs scheduled before {scheduled_before}"
        )
        try:
            scheduled_flow_runs = (
                await self.client.get_scheduled_flow_runs_for_work_pool(
                    work_pool_name=self._work_pool_name,
                    scheduled_before=scheduled_before,
                    work_queue_names=list(self._work_queues),
                )
            )
            self._logger.debug(
                f"Discovered {len(scheduled_flow_runs)} scheduled_flow_runs"
            )
            return scheduled_flow_runs
        except ObjectNotFound:
            # the pool doesn't exist; it will be created on the next
            # heartbeat (or an appropriate warning will be logged)
            return []

    async def _submit_scheduled_flow_runs(
        self, flow_run_response: list["WorkerFlowRunResponse"]
    ) -> list["FlowRun"]:
        """
        Takes a list of WorkerFlowRunResponses and submits the referenced flow runs
        for execution by the worker.
        """
        submittable_flow_runs = [entry.flow_run for entry in flow_run_response]

        for flow_run in submittable_flow_runs:
            if flow_run.id in self._submitting_flow_run_ids:
                self._logger.debug(
                    f"Skipping {flow_run.id} because it's already being submitted"
                )
                continue
            try:
                if self._limiter:
                    self._limiter.acquire_on_behalf_of_nowait(flow_run.id)
            except anyio.WouldBlock:
                self._logger.debug(
                    f"Flow run limit reached; {self.limiter.borrowed_tokens} flow runs"
                    " in progress."
                )
                break
            else:
                run_logger = self.get_flow_run_logger(flow_run)
                run_logger.info(
                    f"Worker '{self.name}' submitting flow run '{flow_run.id}'"
                )
                if self.backend_id:
                    try:
                        worker_url = url_for(
                            "worker",
                            obj_id=self.backend_id,
                            work_pool_name=self._work_pool_name,
                        )

                        run_logger.info(
                            f"Running on worker id: {self.backend_id}. See worker logs here: {worker_url}"
                        )
                    except ValueError as ve:
                        run_logger.warning(f"Failed to generate worker URL: {ve}")

                self._submitting_flow_run_ids.add(flow_run.id)
                if TYPE_CHECKING:
                    assert self._runs_task_group is not None
                self._runs_task_group.start_soon(
                    self._submit_run,
                    flow_run,
                )

        return list(
            filter(
                lambda run: run.id in self._submitting_flow_run_ids,
                submittable_flow_runs,
            )
        )

    async def _submit_run(self, flow_run: "FlowRun") -> None:
        """
        Submits a given flow run for execution by the worker.
        """
        run_logger = self.get_flow_run_logger(flow_run)

        if flow_run.deployment_id:
            try:
                await self.client.read_deployment(flow_run.deployment_id)
            except ObjectNotFound:
                self._logger.exception(
                    f"Deployment {flow_run.deployment_id} no longer exists. "
                    f"Flow run {flow_run.id} will not be submitted for"
                    " execution"
                )
                self._submitting_flow_run_ids.remove(flow_run.id)
                await self._mark_flow_run_as_cancelled(
                    flow_run,
                    state_updates=dict(
                        message=f"Deployment {flow_run.deployment_id} no longer exists, cancelled run."
                    ),
                )
                return

        ready_to_submit = await self._propose_pending_state(flow_run)
        self._logger.debug(f"Ready to submit {flow_run.id}: {ready_to_submit}")
        if ready_to_submit:
            if TYPE_CHECKING:
                assert self._runs_task_group is not None
            readiness_result = await self._runs_task_group.start(
                self._submit_run_and_capture_errors, flow_run
            )

            if readiness_result and not isinstance(readiness_result, Exception):
                try:
                    await self.client.update_flow_run(
                        flow_run_id=flow_run.id,
                        infrastructure_pid=str(readiness_result),
                    )
                except Exception:
                    run_logger.exception(
                        "An error occurred while setting the `infrastructure_pid` on "
                        f"flow run {flow_run.id!r}. The flow run will "
                        "not be cancellable."
                    )

                run_logger.info(f"Completed submission of flow run '{flow_run.id}'")

            else:
                # If the run is not ready to submit, release the concurrency slot
                self._release_limit_slot(flow_run.id)
        else:
            self._release_limit_slot(flow_run.id)
        self._submitting_flow_run_ids.remove(flow_run.id)

    async def _submit_run_and_capture_errors(
        self,
        flow_run: "FlowRun",
        task_status: anyio.abc.TaskStatus[int | Exception] | None = None,
    ) -> BaseWorkerResult | Exception:
        run_logger = self.get_flow_run_logger(flow_run)

        try:
            configuration = await self._get_configuration(flow_run)
            submitted_event = self._emit_flow_run_submitted_event(configuration)
            await self._give_worker_labels_to_flow_run(flow_run.id)

            result = await self.run(
                flow_run=flow_run,
                task_status=task_status,
                configuration=configuration,
            )
        except Exception as exc:
            if task_status and not getattr(task_status, "_future").done():
                # This flow run was being submitted and did not start successfully
                run_logger.exception(
                    f"Failed to submit flow run '{flow_run.id}' to infrastructure."
                )
                # Mark the task as started to prevent agent crash
                task_status.started(exc)
                message = f"Flow run could not be submitted to infrastructure:\n{exc!r}"
                await self._propose_crashed_state(flow_run, message)
            else:
                run_logger.exception(
                    f"An error occurred while monitoring flow run '{flow_run.id}'. "
                    "The flow run will not be marked as failed, but an issue may have "
                    "occurred."
                )
            return exc
        finally:
            self._release_limit_slot(flow_run.id)

        if task_status and not getattr(task_status, "_future").done():
            run_logger.error(
                f"Infrastructure returned without reporting flow run '{flow_run.id}' "
                "as started or raising an error. This behavior is not expected and "
                "generally indicates improper implementation of infrastructure. The "
                "flow run will not be marked as failed, but an issue may have occurred."
            )
            # Mark the task as started to prevent agent crash
            task_status.started(
                RuntimeError(
                    "Infrastructure returned without reporting flow run as started or raising an error."
                )
            )

        if result.status_code != 0:
            await self._propose_crashed_state(
                flow_run,
                (
                    "Flow run infrastructure exited with non-zero status code"
                    f" {result.status_code}."
                ),
            )

        if submitted_event:
            self._emit_flow_run_executed_event(result, configuration, submitted_event)

        return result

    def _release_limit_slot(self, flow_run_id: UUID) -> None:
        """
        Frees up a slot taken by the given flow run id.

        This method gracefully handles cases where the slot has already been released
        to prevent worker crashes from double-release scenarios.
        """
        if self._limiter:
            try:
                self._limiter.release_on_behalf_of(flow_run_id)
                self._logger.debug("Limit slot released for flow run '%s'", flow_run_id)
            except RuntimeError:
                # Slot was already released - this can happen in certain error paths
                # where multiple cleanup attempts occur. Log it but don't crash.
                self._logger.debug(
                    "Limit slot for flow run '%s' was already released", flow_run_id
                )

    def get_status(self) -> dict[str, Any]:
        """
        Retrieves the status of the current worker including its name, current worker
        pool, the work pool queues it is polling, and its local settings.
        """
        return {
            "name": self.name,
            "work_pool": (
                self._work_pool.model_dump(mode="json")
                if self._work_pool is not None
                else None
            ),
            "settings": {
                "prefetch_seconds": self._prefetch_seconds,
            },
        }

    async def _get_configuration(
        self,
        flow_run: "FlowRun",
        deployment: Optional["DeploymentResponse"] = None,
    ) -> C:
        if not deployment and flow_run.deployment_id:
            deployment = await self.client.read_deployment(flow_run.deployment_id)

        flow = await self.client.read_flow(flow_run.flow_id)

        deployment_vars = getattr(deployment, "job_variables", {}) or {}
        flow_run_vars = flow_run.job_variables or {}
        job_variables = {**deployment_vars}

        # merge environment variables carefully, otherwise full override
        if isinstance(job_variables.get("env"), dict):
            job_variables["env"].update(flow_run_vars.pop("env", {}))
        job_variables.update(flow_run_vars)

        configuration = await self.job_configuration.from_template_and_values(
            base_job_template=self.work_pool.base_job_template,
            values=job_variables,
            client=self.client,
        )
        try:
            configuration.prepare_for_flow_run(
                flow_run=flow_run,
                deployment=deployment,
                flow=flow,
                work_pool=self.work_pool,
                worker_name=self.name,
            )
        except TypeError:
            warnings.warn(
                "This worker is missing the `work_pool` and `worker_name` arguments "
                "in its JobConfiguration.prepare_for_flow_run method. Please update "
                "the worker's JobConfiguration  class to accept these arguments to "
                "avoid this warning.",
                category=PrefectDeprecationWarning,
            )
            # Handle older subclasses that don't accept work_pool and worker_name
            configuration.prepare_for_flow_run(
                flow_run=flow_run, deployment=deployment, flow=flow
            )
        return configuration

    async def _propose_pending_state(self, flow_run: "FlowRun") -> bool:
        run_logger = self.get_flow_run_logger(flow_run)
        state = flow_run.state
        try:
            state = await propose_state(self.client, Pending(), flow_run_id=flow_run.id)
        except Abort as exc:
            run_logger.info(
                (
                    f"Aborted submission of flow run '{flow_run.id}'. "
                    f"Server sent an abort signal: {exc}"
                ),
            )

            return False
        except Exception:
            run_logger.exception(
                f"Failed to update state of flow run '{flow_run.id}'",
            )
            return False

        if not state.is_pending():
            run_logger.info(
                (
                    f"Aborted submission of flow run '{flow_run.id}': "
                    f"Server returned a non-pending state {state.type.value!r}"
                ),
            )
            return False

        return True

    async def _propose_failed_state(self, flow_run: "FlowRun", exc: Exception) -> None:
        run_logger = self.get_flow_run_logger(flow_run)
        try:
            await propose_state(
                self.client,
                await exception_to_failed_state(message="Submission failed.", exc=exc),
                flow_run_id=flow_run.id,
            )
        except Abort:
            # We've already failed, no need to note the abort but we don't want it to
            # raise in the agent process
            pass
        except Exception:
            run_logger.error(
                f"Failed to update state of flow run '{flow_run.id}'",
                exc_info=True,
            )

    async def _propose_crashed_state(
        self, flow_run: "FlowRun", message: str, client: PrefectClient | None = None
    ) -> None:
        run_logger = self.get_flow_run_logger(flow_run)
        try:
            state = await propose_state(
                client or self.client,
                Crashed(message=message),
                flow_run_id=flow_run.id,
            )
        except Abort:
            # Flow run already marked as failed
            pass
        except ObjectNotFound:
            # Flow run was deleted - log it but don't crash the worker
            run_logger.debug(
                f"Flow run '{flow_run.id}' was deleted before state could be updated"
            )
        except Exception:
            run_logger.exception(f"Failed to update state of flow run '{flow_run.id}'")
        else:
            if state.is_crashed():
                run_logger.info(
                    f"Reported flow run '{flow_run.id}' as crashed: {message}"
                )

    async def _mark_flow_run_as_cancelled(
        self, flow_run: "FlowRun", state_updates: dict[str, Any] | None = None
    ) -> None:
        state_updates = state_updates or {}
        state_updates.setdefault("name", "Cancelled")

        if flow_run.state:
            state_updates.setdefault("type", StateType.CANCELLED)
            state = flow_run.state.model_copy(update=state_updates)
        else:
            # Unexpectedly when flow run does not have a state, create a new one
            # does not need to explicitly set the type
            state = Cancelled(**state_updates)

        try:
            await self.client.set_flow_run_state(flow_run.id, state, force=True)
        except ObjectNotFound:
            # Flow run was deleted - log it but don't crash the worker
            run_logger = self.get_flow_run_logger(flow_run)
            run_logger.debug(
                f"Flow run '{flow_run.id}' was deleted before it could be marked as cancelled"
            )

        # Do not remove the flow run from the cancelling set immediately because
        # the API caches responses for the `read_flow_runs` and we do not want to
        # duplicate cancellations.
        await self._schedule_task(
            60 * 10, self._cancelling_flow_run_ids.remove, flow_run.id
        )

    async def _set_work_pool_template(
        self, work_pool: "WorkPool", job_template: dict[str, Any]
    ):
        """Updates the `base_job_template` for the worker's work pool server side."""

        await self.client.update_work_pool(
            work_pool_name=work_pool.name,
            work_pool=WorkPoolUpdate(
                base_job_template=job_template,
            ),
        )

    async def _schedule_task(
        self, __in_seconds: int, fn: Callable[..., Any], *args: Any, **kwargs: Any
    ):
        """
        Schedule a background task to start after some time.

        These tasks will be run immediately when the worker exits instead of waiting.

        The function may be async or sync. Async functions will be awaited.
        """
        if not self._runs_task_group:
            raise RuntimeError(
                "Worker has not been correctly initialized. Please use the worker class as an async context manager."
            )

        async def wrapper(task_status: anyio.abc.TaskStatus[Any]):
            # If we are shutting down, do not sleep; otherwise sleep until the scheduled
            # time or shutdown
            if self.is_setup:
                with anyio.CancelScope() as scope:
                    self._scheduled_task_scopes.add(scope)
                    task_status.started()
                    await anyio.sleep(__in_seconds)

                self._scheduled_task_scopes.remove(scope)
            else:
                task_status.started()

            result = fn(*args, **kwargs)
            if asyncio.iscoroutine(result):
                await result

        await self._runs_task_group.start(wrapper)

    async def _give_worker_labels_to_flow_run(self, flow_run_id: UUID):
        """
        Give this worker's identifying labels to the specified flow run.
        """
        if self._client:
            labels: KeyValueLabels = {
                "prefect.worker.name": self.name,
                "prefect.worker.type": self.type,
            }

            if self._work_pool:
                labels.update(
                    {
                        "prefect.work-pool.name": self._work_pool.name,
                        "prefect.work-pool.id": str(self._work_pool.id),
                    }
                )

            await self._client.update_flow_run_labels(flow_run_id, labels)

    async def __aenter__(self) -> Self:
        self._logger.debug("Entering worker context...")
        await self.setup()

        return self

    async def __aexit__(self, *exc_info: Any) -> None:
        try:
            self._logger.debug("Exiting worker context...")
            await self.teardown(*exc_info)
        except (ExceptionGroup, BaseExceptionGroup) as exc:
            # For less verbose tracebacks
            exceptions = exc.exceptions
            if len(exceptions) == 1:
                raise exceptions[0] from None
            else:
                raise

    def __repr__(self) -> str:
        return f"Worker(pool={self._work_pool_name!r}, name={self.name!r})"

    def _event_resource(self):
        return {
            "prefect.resource.id": f"prefect.worker.{self.type}.{self.get_name_slug()}",
            "prefect.resource.name": self.name,
            "prefect.version": prefect.__version__,
            "prefect.worker-type": self.type,
        }

    def _event_related_resources(
        self,
        configuration: BaseJobConfiguration | None = None,
        include_self: bool = False,
    ) -> list[RelatedResource]:
        related: list[RelatedResource] = []
        if configuration:
            related += getattr(configuration, "_related_resources")()

        if self._work_pool:
            related.append(
                object_as_related_resource(
                    kind="work-pool", role="work-pool", object=self._work_pool
                )
            )

        if include_self:
            worker_resource = self._event_resource()
            worker_resource["prefect.resource.role"] = "worker"
            related.append(RelatedResource.model_validate(worker_resource))

        return related

    def _emit_flow_run_submitted_event(
        self, configuration: BaseJobConfiguration
    ) -> Event | None:
        return emit_event(
            event="prefect.worker.submitted-flow-run",
            resource=self._event_resource(),
            related=self._event_related_resources(configuration=configuration),
        )

    def _emit_flow_run_executed_event(
        self,
        result: BaseWorkerResult,
        configuration: BaseJobConfiguration,
        submitted_event: Event | None = None,
    ):
        related = self._event_related_resources(configuration=configuration)

        for resource in related:
            if resource.role == "flow-run":
                resource["prefect.infrastructure.identifier"] = str(result.identifier)
                resource["prefect.infrastructure.status-code"] = str(result.status_code)

        emit_event(
            event="prefect.worker.executed-flow-run",
            resource=self._event_resource(),
            related=related,
            follows=submitted_event,
        )

    async def _emit_worker_started_event(self) -> Event | None:
        return emit_event(
            "prefect.worker.started",
            resource=self._event_resource(),
            related=self._event_related_resources(),
        )

    async def _emit_worker_stopped_event(self, started_event: Event):
        emit_event(
            "prefect.worker.stopped",
            resource=self._event_resource(),
            related=self._event_related_resources(),
            follows=started_event,
        )
